package com.zork.data.generator.config;

import cn.hutool.core.util.StrUtil;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;

import java.util.Map;

/**
 * @author xiesen
 */
@Configuration
public class CustomKafkaConfig {
    /**
     * 生产者
     */
    @Bean("kafkaStringTemplate")
    public KafkaTemplate<String, Object> kafkaStringTemplate(
            @Autowired @Qualifier("kafkaProperties") KafkaProperties customKafkaProperties) {
        return new KafkaTemplate<>(producerFactory(customKafkaProperties));
    }

    private ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
        kafkaProperties.getProducer().setAcks(StrUtil.isBlank(kafkaProperties.getProducer().getAcks()) ? "all" : kafkaProperties.getProducer().getAcks());
        Map<String, Object> properties = kafkaProperties.buildProducerProperties();
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization" +
                ".StringSerializer");
        return new DefaultKafkaProducerFactory<>(properties);
    }


    @Primary
    @ConfigurationProperties(prefix = "spring.kafka")
    @Bean
    public KafkaProperties kafkaProperties() {
        return new KafkaProperties();
    }

    @Primary
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate(@Autowired ProducerFactory<?, ?> kafkaProducerFactory, @Autowired KafkaProperties kafkaProperties,
                                             ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(kafkaProperties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

}
